package com.danan.data_loader.function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.danan.data_loader.common.Database;
import com.danan.data_loader.mapper.TargetMapper;
import com.danan.data_loader.model.DataModel;
import com.danan.data_loader.util.ConfigUtil;
import com.danan.data_loader.util.ConnectionUtil;
import com.danan.data_loader.util.SqlSessionFactoryUtil;
import com.danan.data_loader.util.metadata.impl.oracle.OracleColumnMetaData;
import com.danan.data_loader.util.metadata.impl.oracle.OracleTableMetaData;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;

import java.sql.Connection;
import java.util.*;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/15/9:11
 * @Description:
 */
@Slf4j
public class OracleSinkFunction extends RichSinkFunction<String> {
    private final Map<String, List<String>> primaryKeyMap = new HashMap<>();
    private TargetMapper targetMapper;
    private final String schema;

    public OracleSinkFunction(String schema) {
        this.schema = schema;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 1 获取所有表的主键信息
        getPrimaryKeys();
        // 2 MyBatis初始化
        SqlSessionFactory sqlSessionFactory = SqlSessionFactoryUtil.getSqlSessionFactory();
        SqlSession sqlSession = sqlSessionFactory.openSession(true);
        targetMapper = sqlSession.getMapper(TargetMapper.class);
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
//        System.out.println(value);

        JSONObject obj = JSON.parseObject(value);
        String table = obj.getString("table");
        String type = obj.getString("type");
        JSONObject after = obj.getJSONObject("after");
        JSONObject before = obj.getJSONObject("before");

        // 处理after

        switch (type) {
            case "c":
            case "r":
//                log.info(" *** 表{}插入数据 ***", schema + "." + table);
                try {
                    targetMapper.insert(schema, table, getColumns(after).f0, getColumns(after).f1);
                } catch (Exception e) {
                    log.info(e.getMessage());
                }
                break;
//            case "r":
//                // 历史数据处理
//                processHistoryData(schema, table, after);
//                break;
            case "u":
                ArrayList<String> afterValues = getColumnsAndValues(before, after);
                if (afterValues.size() != 0) {
                    // 前后数据不一致再修改，相同就不修改了
                    targetMapper.update(schema, table, getColumnsAndValues(before, schema, table, type), afterValues);
                }
                break;
            case "d":
                targetMapper.delete(schema, table, getColumnsAndValues(before, schema, table, type));
                break;
        }
    }

    /**
     * @Description 过滤出修改的数据
     * @Param [beforeObj, afterObj]
     **/
    private ArrayList<String> getColumnsAndValues(JSONObject beforeObj, JSONObject afterObj) {
        ArrayList<String> beforeColumnsAndValues = getColumnsAndValues(beforeObj);
        ArrayList<String> afterColumnsAndValues = getColumnsAndValues(afterObj);
        afterColumnsAndValues.removeIf(beforeColumnsAndValues::contains);
        return afterColumnsAndValues;
    }

    /**
     * @Description 获取筛选条件
     * @Param [beforeObj, table, schema]
     **/
    private ArrayList<String> getColumnsAndValues(JSONObject beforeObj, String schema, String table, String type) throws Exception {
        List<String> primaryKeys = primaryKeyMap.get(schema + "." + table);
        ArrayList<String> result = new ArrayList<>();
        String op = "u".equals(type) ? "修改" : "删除";
        if (primaryKeys.size() != 0) {
            log.info("表{}根据主键筛选，并{}数据", schema + "." + table, op);
            for (String primaryKey : primaryKeys) {
                result.add(primaryKey + "=" + beforeObj.getString(primaryKey));
            }
        } else {
//            List<String> newKeys = getPrimaryKeys(schema, table);
//            if (newKeys.size() != 0) {
//                log.info("表{}根据主键筛选，并{}数据", schema + "." + table, op);
//                primaryKeyMap.put(schema + "." + table, newKeys);
//                for (String newKey : newKeys) {
//                    result.add(newKey + "=" + beforeObj.getString(newKey));
//                }
//            } else {
//                log.info("表{}根据所有旧数据筛选，并{}数据", schema + "." + table, op);
//                result = getColumnsAndValues(beforeObj);
//                result.removeIf(s -> s.contains("=null"));
//            }
            log.info("表{}根据所有旧数据筛选，并{}数据", schema + "." + table, op);
            result = getColumnsAndValues(beforeObj);
            result.removeIf(s -> s.contains("=null"));
        }
        return result;
    }

    private ArrayList<String> getColumnsAndValues(JSONObject obj) {
        Set<String> columns = obj.keySet();
        ArrayList<String> result = new ArrayList<>();
        for (String column : columns) {
            result.add(column + "=" + obj.getString(column));
        }
        return result;
    }

    private ArrayList<String> getValues(JSONObject obj) {
        Set<String> columns = obj.keySet();
        ArrayList<String> result = new ArrayList<>();
        for (String column : columns) {
            result.add(obj.getString(column));
        }
        return result;
    }

    private Tuple2<ArrayList<String>, ArrayList<String>> getColumns(JSONObject obj) {
        Set<String> columns = obj.keySet();
        ArrayList<String> keys = new ArrayList<>();
        ArrayList<String> values = new ArrayList<>();
        for (String column : columns) {
            String value = obj.getString(column);
            if (!"null".equals(value)) {
                // 只插入值不为null
                keys.add(column);
                values.add(value);
            }
        }
        return new Tuple2<>(keys, values);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    /**
     * @Description 获取指定表的主键信息
     * @Param [schema, table]
     **/
    private List<String> getPrimaryKeys(String schema, String table) throws Exception {
        String type = ConfigUtil.getProperty("data.source.type");
        Database DB = Database.valueOf(type.toUpperCase());
        Connection connection = ConnectionUtil.get(
                DB.driverClass,
                String.format(DB.urlTemplate, ConfigUtil.getProperty("data.source.hostname"), ConfigUtil.getProperty("data.source.port"), ConfigUtil.getProperty("data.source.database")),
                ConfigUtil.getProperty("data.source.username"),
                ConfigUtil.getProperty("data.source.password")
        );
        List<String> primaryKeys = new OracleColumnMetaData().getPrimaryKeys(connection, schema, table);
        connection.close();
        return primaryKeys;
    }

    /**
     * @Description 获取所有表的主键信息
     * @Param []
     **/
    private void getPrimaryKeys() throws Exception {
        String type = ConfigUtil.getProperty("data.source.type");
        Database DB = Database.valueOf(type.toUpperCase());
        Connection connection = ConnectionUtil.get(
                DB.driverClass,
                String.format(DB.urlTemplate, ConfigUtil.getProperty("data.source.hostname"), ConfigUtil.getProperty("data.source.port"), ConfigUtil.getProperty("data.source.database")),
                ConfigUtil.getProperty("data.source.username"),
                ConfigUtil.getProperty("data.source.password")
        );
        List<String> tables = new OracleTableMetaData().getMetaData(connection, schema);
        for (String table : tables) {
            primaryKeyMap.put(schema + "." + table, new OracleColumnMetaData().getPrimaryKeys(connection, schema, table));
        }
        connection.close();
    }
}
